iT邦幫忙

2023 iThome 鐵人賽

DAY 22
0

前面講了一些基本概念,我們來看一個比較完整的案例吧

講到 Streaming,有一個很好用的工具叫 Kafka,應該也是大多數工程師或公司會用的工具。所以我們來寫一個 Flink job,每5秒將讀到的文字累加後輸出到 Redis.

取得資料

public class KafkaToRedisExample {

    public static void main(String[] args) throws Exception {
        // 設定Flink執行環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 設置Kafka配置
        String kafkaBootstrapServers = "localhost:9092";
        String kafkaTopic = "your-kafka-topic";
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", kafkaBootstrapServers);

        // 設定 Kafka Streaming Source
        FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(kafkaTopic, new SimpleStringSchema(), properties);
        DataStream<String> kafkaStream = env.addSource(kafkaConsumer);

        // ...未完待續
    }
}

上面這段程式裡,我們先取得一個 StreamExecutionEnvironment 。這個 env 是必要的,是 Flink 的起手式。隨著我們要寫 Batch 或是使用 TableApi 會有點差異,但概念是相同的。

我們是將 source 、sink 註冊到這個 env 裡面再去啟動的,就跟 Airflow 必須要有 DAG 一樣。

env 也可以設定自動儲存,多久存一次,你可以視需求打開。

由於 Kafka 是很熱門的工具,所以 Flink 其實有官方套件支援。不過它並沒有預先整合進 Flink,你需要在 POM 檔內設定 denpency ,詳情可以參考這篇 Kafka | Apache Flink

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka</artifactId>
    <version>1.17.1</version>
</dependency>

如果使用 Kafka source,flink-connector-base 也需要包含在依赖中:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-base</artifactId>
    <version>1.17.1</version>
</dependency>

然後,不要忘記 DataStream<String> kafkaStream = env.addSource(kafkaConsumer); 你的 source 都要加到 env 內。

轉換資料

在上面拿到了 source 之後,我們就可以接著做一些 operator 的處理,像是 flatMap ,會將一個輸入轉換成多個輸出。

public static void main(String[] args) throws Exception {
      // ...接續
  
    	// 使用 TimeWindows,這裡每5秒執行一次
        DataStream<Tuple2<String, Integer>> windowedStream = kafkaStream
            .flatMap(new Tokenizer())
            .keyBy(0)
            .window(TumblingEventTimeWindows.of(Time.seconds(5)))
            .sum(1);
      // ...未完待續
}

// Tokenizer用於將輸入字串拆分
public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
    @Override
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
        // 依空格拆分,生成 (key, 1) 
        String[] words = value.split(" ");
        for (String word : words) {
            out.collect(new Tuple2<>(word, 1));
        }
    }
}

接下來我們建一個 class 去 implement FlatMapFunction 這個介面。實作可以依照需求,我們這裡就假設將一句話拆成單字,並將每個單字轉換成 Tuple2 來丟到 Collector 裡。

Tuple2 是 Flink 提供的彈性物件,用於將不同屬性的值打包在一起。畢竟 Java 是蠻硬的語言,在 ETL 過程中硬要轉成一個 class 不一定有好處,但還是可以依需求而定。

至於 new Tuple2<>(word, 1) 後面是 1,就是要用 Map-Reduce 的概念去理解,每個單字自帶一個數字的話,後面要累加或計數比較輕鬆。

接下來 keyBy(0) 代表我們拿 tuple 內第 0 個元素 (word) 做為分組的 key,所以下一行的 window 就會將相同的 word 放在同一個通道內去計算。因為在 sum(1) 的時候就會拿 tuple 的第 1 個元素 (1) 來累加。

在這裡也許有人會問, String[] words = value.split(" "); 的 words 可能會有重覆的單字出現,為什麼不自己先累加呢?例如出現了兩個 as ,就能輸出 new Tuple2<>(word,2) 這樣。

這不是不行,但增加了程式複雜度,而且沒提升多少效能。Flink 擅長處理大量的資料,只要你的硬體足夠就行。這種情況下自己生成一個 map 去 for loop 加總後再輸出,反而浪費了記憶體去重覆生成 map 再釋放掉。

Sink

最後,我們先簡單設定一下 redis,然後 import RedisSink 。這個也並沒有被包在 Flink 內,一樣要另外在 pom 檔設定。詳情可以參考 Apache Flink Streaming Connector for Redis

<dependency>
  <groupId>org.apache.bahir</groupId>
  <artifactId>flink-connector-redis_2.11</artifactId>
  <version>1.1-SNAPSHOT</version>
</dependency>

而且因為 Redis 特性,它不太可能當作 streaming 的 source,但做為 sink 是很好的。


public static void main(String[] args) throws Exception {
      // ...接續
  
    	// Redis配置
        FlinkJedisPoolConfig jedisPoolConfig = new FlinkJedisPoolConfig.Builder()
            .setHost("localhost")
            .setPort(6379)
            .build();

        // 建立 RedisSink
        RedisSink<Tuple2<String, Integer>> redisSink = new RedisSink<>(jedisPoolConfig, new RedisExampleMapper());

        // 將 RedisSink 做為窗口處理後的流
        windowedStream.addSink(redisSink);

      // ...未完待續
}

// RedisExampleMapper用於將Tuple2寫入Redis
public static final class RedisExampleMapper implements RedisMapper<Tuple2<String, Integer>> {
    @Override
    public RedisCommandDescription getCommandDescription() {
        // 使用HSET命令将(key, value)寫入Redis
        return new RedisCommandDescription(RedisCommand.HSET, "word_count");
    }

    @Override
    public String getKeyFromData(Tuple2<String, Integer> data) {
        // 使用單字作為Redis Hash Table 的 Key
        return data.f0;
    }

    @Override
    public String getValueFromData(Tuple2<String, Integer> data) {
        // 使用總數作为Redis Hash Table 的 value
        return data.f1.toString();
    }
}

由於 Redis 基本上是 key-value 的大表,跟我們現在的資料流是一樣的 (tuple),因此我們很簡單的將 f0 當 key,f1 做為 value 就好了。

最後,也要記得將這個 sink 接到前面的資料流後面。

執行 env

其實就一行

public static void main(String[] args) throws Exception {
      // ...接續
  
    	env.execute("Kafka to Redis Example");
}

還記得一開始我們建立了 env 後,加入了 source 跟中間的 operator 嗎?最後的最後要記得呼叫 env.execute() ,這樣 Flink 才會真的去啟動這個 job。

如此一來,一個基本的 Flink streaming job 就完成了。雖然有點小複雜,但熟悉之後其實很好做到許多特殊需求。而且因為 streaming 的概念是無窮無盡的資料流,所以你也不太需要管理排程,因為其實無所謂排程。


上一篇
Flink 存檔跟還原機制 - Day21
下一篇
Flink Streaming 與 JDBC - Day23
系列文
用 Airflow & Flink 來開發 ETL 吧30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言